package com.sisyphus

import java.text.SimpleDateFormat
import java.util.Date

import com.sisyphus.LogAnalysis.logger
import com.sisyphus.source.{KafkaSource, MySQLSource}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * 业务需求：统计一分钟内每个用户产生的流量
 */
object LogAnalysis02 {
  def main(args: Array[String]): Unit = {
    // 1. execution enviroment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 添加处理时间的类型
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 2. source
    val data = env.addSource(new KafkaSource().flinkKafkaConsumer())

    // 3. transformation
    /**
     * 数据清洗
     * 清洗后的规则：level=E,time,domain.traffic
     *
     * name country level time ip domain traffic
     * imooc	CN	E	2020-07-27 20:51:13	183.225.139.16	v2.go2yd.com	7405
     * ==>
     * time  domain  traffic
     * (1595854273000,v2.go2yd.com,7405)
     *
     **/
    val cleanData = data.map(x => {
      val splits = x.split("\t")
      val level = splits(2)

      val timeStr = splits(3)
      var time = 0l
      try {
        val sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        time = sourceFormat.parse(timeStr).getTime
      } catch {
        case e: Exception => {
          logger.error(s"time parse error: $timeStr", e.getMessage)
        }
      }

      val domain = splits(5)
      val traffic = splits(6).toLong

      (level, time, domain, traffic)
    })
      .filter(_._2 != 0)
      .filter(_._1 == "E")
      .map(x => {
        (x._2, x._3, x._4) // 1 level(抛弃)  2 time  3 domain   4 traffic
      })

    val mysqlData = env.addSource(new MySQLSource)

    // FIXME...添加数据库连接后导致window窗口内没有数据，待研究...
    val connectData = cleanData.connect(mysqlData)
      .flatMap(new CoFlatMapFunction[(Long, String, Long), mutable.HashMap[String, String], (Long, String, Long)] {

        var userDomainMap = mutable.HashMap[String, String]()

        // log
        override def flatMap1(value: (Long, String, Long), out: Collector[(Long, String, Long)]): Unit = {
          val domain = value._2
          val userId = userDomainMap.getOrElse(domain, "")
          println("~~~~~" + userId)
          out.collect((value._1, userId, value._3))
        }

        // MySQL
        override def flatMap2(value: mutable.HashMap[String, String], out: Collector[(Long, String, Long)]): Unit = {
          userDomainMap = value
        }
      })

    /**
     * 定义watermark
     */
    val watermarkData = connectData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Long, String, Long)] {

      // 乱序容忍最大时间
      val maxOutOfOrderness = 60 * 1000L // 60 seconds

      // 当前最大时间
      var currentMaxTimestamp: Long = _

      override def getCurrentWatermark: Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      }

      override def extractTimestamp(t: (Long, String, Long), l: Long): Long = {
        val timestamp = t._1
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        timestamp
      }
    })

    /**
     * 业务实现
     */
    val res = watermarkData.keyBy(1)
      .window(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply(new WindowFunction[(Long, String, Long), (String, String, Long), Tuple, TimeWindow] {
        override def apply(key: Tuple, window: TimeWindow,
                           input: Iterable[(Long, String, Long)],
                           out: Collector[(String, String, Long)]): Unit = {

          val userId = key.getField(0).toString
          var sum = 0L

          val times = ArrayBuffer[Long]()

          val iterator = input.iterator
          while (iterator.hasNext) {
            val next = iterator.next()
            // traffic求和
            sum += next._3
            // 获取当前窗口时间
            times.append(next._1)
          }

          val time = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(times.max))

          /**
           * 第一个参数：这一分钟的时间 2019-09-2020：20
           * 第二个参数：用户ID
           * 第三个参数：traffic的和
           */
          out.collect((time, userId, sum))
        }
      })
      .print()
    // 4. sink
    //    res.addSink(new KafkaSink().flinkKafkaProducer())

    // 5. execute
    env.execute("LogAnalysis2")
  }
}
